/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
 */

package org.apache.pekko.remote

import org.apache.pekko
import pekko.actor.Actor
import pekko.actor.ActorRef
import pekko.actor.InternalActorRef
import pekko.actor.Terminated
import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import pekko.dispatch.sysmsg.DeathWatchNotification

/**
 * INTERNAL API
 */
private[pekko] object RemoteDeploymentWatcher {
  final case class WatchRemote(actor: ActorRef, supervisor: ActorRef)
}

/**
 * INTERNAL API
 *
 * Responsible for cleaning up child references of remote deployed actors when remote node
 * goes down (jvm crash, network failure), i.e. triggered by [[pekko.actor.AddressTerminated]].
 */
private[pekko] class RemoteDeploymentWatcher extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
  import RemoteDeploymentWatcher._
  var supervisors = Map.empty[ActorRef, InternalActorRef]

  def receive = {
    case WatchRemote(a, supervisor: InternalActorRef) =>
      supervisors += (a -> supervisor)
      context.watch(a)

    case t @ Terminated(a) if supervisors.isDefinedAt(a) =>
      // send extra DeathWatchNotification to the supervisor so that it will remove the child
      supervisors(a).sendSystemMessage(
        DeathWatchNotification(a, existenceConfirmed = t.existenceConfirmed, addressTerminated = t.addressTerminated))
      supervisors -= a

    case _: Terminated =>
  }
}
